Skip to content

Conversation

CTTY
Copy link
Contributor

@CTTY CTTY commented Oct 10, 2025

Which issue does this PR close?

What changes are included in this PR?

New:

  • Added new partitioning module with PartitioningWriter trait
  • ClusteredDataWriter: Optimized for pre-sorted data, requires writing in partition order
  • FanoutDataWriter: Flexible writer that can handle data from any partition at any time

Modification:

  • (BREAKING) Modified DataFileWriterBuilder to support dynamic partition assignment
  • Updated DataFusion integration to use the new writer API

Are these changes tested?

Added unit tests

/// Build the iceberg writer.
async fn build(self) -> Result<Self::R>;
/// Build the iceberg writer for an optional partition key.
async fn build_with_partition(self, partition_key: Option<PartitionKey>) -> Result<Self::R>;
Copy link
Contributor Author

@CTTY CTTY Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. I believe this is necessary because:

  1. IcebergWriter is supposed to generate DataFile that always hold a partition value according to iceberg spec.

  2. The existing code store partition value in the builder directly, making builder.clone() useless:

let builder = IcebergWriterBuilder::new(partition_A);
let writer_A = builder.build();
... // write to partition A

// done with partition A and now we need to write to partition B
// this is wrong because partition value A is still stored in the builder
let writer_B = builder.clone().build() 

An alternative is to add a new method clone_with_partition() but that would also be a breaking change and it's less clean compared to build_with_partition()


/// A writer that writes data to a single partition at a time.
#[derive(Clone)]
pub struct ClusteredDataWriter<B: IcebergWriterBuilder> {
Copy link
Contributor Author

@CTTY CTTY Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClusteredDataWriter and FanoutDataWriter are supposed to only work with DefaultInput and DefaultOutput.

I tried including generic i/o few weeks ago and remember there were tons of tricky nuances and decided to just go for the default IO type for now. For other IO type(e.g. PositionalDeleteInput), we will need add another implementation later.

Also maybe we should name these DefaultClustered/FanoutWriter to avoid confusion since they can also write equality deletes?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement fanout partitioned data writer.

1 participant